"""Cluster management for Kubernetes-based integration tests."""
import contextlib
import copy
import datetime
import json
import os
import shutil
import subprocess
import time
import typing
import unittest
from urllib import parse

from kubernetes import client
from kubernetes import config
from kubernetes import dynamic
from kubernetes import watch

from . import test
from .. import logger
from .. import misc
from .. import yaml

LOGGER = logger.get_logger(__name__)

DEFAULT_RESOURCES = (
    'ConfigMap',
    'Deployment',
    'Role',
    'RoleBinding',
    'Secret',
    'Service',
    'ServiceAccount',
)


def skip_without_requirements() -> typing.Callable[[typing.Any], typing.Any]:
    """Skip without the required dependencies."""
    for dependency in ('kind', 'docker'):
        if shutil.which(dependency) is None:
            return unittest.skip(f'Required "{dependency}" was not found in PATH')
    return lambda func: func


class KubernetesCluster(test.TestCaseWithContext):
    """Kubernetes-based integration test."""

    cluster_name = 'cki'
    known_ports = (
        ('rabbitmq',         5671, 30000),  # AMQP via TLS
        ('rabbitmq',         5672, 30001),  # AMQP
        ('rabbitmq',        15671, 30002),  # HTTP API via TLS
        ('rabbitmq',        15672, 30003),  # HTTP API
        ('rabbitmq',        61613, 30004),  # Stomp
        ('rabbitmq',        61614, 30005),  # Stomp via TLS
        ('vault',            8200, 30006),  # HTTP API
        ('mariadb',          3306, 30007),  # SQL
        ('minio',            9000, 30008),  # S3 API
        ('image-under-test', 8000, 30009),  # HTTP
        ('image-under-test', 8080, 30010),  # HTTP
        ('image-under-test', 8765, 30011),  # Metrics
        ('remote-responses', 7999, 30012),  # Responses
    )

    hostname: str = ''
    api_client: client.ApiClient = None
    dynamic_client: dynamic.client.DynamicClient = None
    core_v1: client.CoreV1Api = None

    image_under_test: str | None = None

    @classmethod
    def setUpClass(cls) -> None:
        """Provision the cluster."""
        super().setUpClass()
        cls.enterClassContext(cls._cluster())

    @staticmethod
    def _is_ci() -> bool:
        return 'DOCKER_HOST' in os.environ

    @classmethod
    def _setup_access(cls) -> None:
        kubeconfig = subprocess.run(['kind', 'get', 'kubeconfig', '--name', cls.cluster_name],
                                    capture_output=True, encoding='utf8', check=True).stdout
        client_config = client.Configuration()
        config.load_kube_config_from_dict(yaml.load(contents=kubeconfig), persist_config=False,
                                          client_configuration=client_config)
        if cls._is_ci():
            server_url = parse.urlparse(client_config.host)
            server_url = server_url._replace(netloc=f'{cls.hostname}:{server_url.port}')
            client_config.host = server_url.geturl()
        cls.api_client = client.ApiClient(configuration=client_config)
        cls.dynamic_client = dynamic.client.DynamicClient(cls.api_client)
        cls.core_v1 = client.CoreV1Api(cls.api_client)

    @classmethod
    @contextlib.contextmanager
    def _cluster(cls) -> typing.Iterator[None]:
        kind_config: dict[str, typing.Any] = {
            'apiVersion': 'kind.x-k8s.io/v1alpha4', 'kind': 'Cluster',
            'name': cls.cluster_name,
            'nodes': [{
                'role': 'control-plane',
                'extraPortMappings': [
                    {'containerPort': n, 'hostPort': c} for _, c, n in cls.known_ports
                ],
                'extraMounts':  [] if cls._is_ci() else [{
                    'hostPath': '/',
                    'containerPath': '/host',
                    'readOnly': True,
                }],
            }],
        }
        if cls._is_ci():
            cls.hostname = typing.cast(str, parse.urlparse(os.environ['DOCKER_HOST']).hostname)
            # add to the apiServer certSANs the name of the docker (dind)
            # service in order to be able to reach the cluster through it
            cert_patch = [
                {'op': 'add', 'path': '/apiServer/certSANs/-', 'value': cls.hostname}]
            kind_config.update({
                'networking': {'apiServerAddress': '0.0.0.0', 'apiServerPort': 6443},
                'kubeadmConfigPatchesJSON6902': [{
                    'group': 'kubeadm.k8s.io',
                    'version': 'v1beta3',
                    'kind': 'ClusterConfiguration',
                    'patch': json.dumps(cert_patch),
                }],
            })
        else:
            cls.hostname = 'localhost'

        create = True
        clusters = subprocess.run(['kind', 'get', 'clusters'],
                                  capture_output=True, encoding='utf8', check=True).stdout
        if exists := (cls.cluster_name in clusters.strip().split('\n')):
            cls._setup_access()
            with misc.only_log_exceptions():
                data = cls.core_v1.read_namespaced_config_map('cluster-config', 'cluster-info')
                if yaml.load(contents=data.data['kind-config.yaml']) == kind_config:
                    LOGGER.info('Skipping cluster creation')
                    create = False

        if exists and create:
            LOGGER.info('Tearing down cluster "%s"', cls.cluster_name)
            subprocess.run(['kind', 'delete', 'cluster', '--name', cls.cluster_name], check=True)

        if create:
            LOGGER.info('Creating cluster "%s"', cls.cluster_name)
            subprocess.run(['kind', 'create', 'cluster', '--config', '-'],
                           input=json.dumps(kind_config), encoding='utf8', check=True)
            cls._setup_access()
            cls.core_v1.create_namespace(client.V1Namespace(
                metadata=client.V1ObjectMeta(name='cluster-info')))
            cls.k8s_apply(namespace='cluster-info', body={
                'apiVersion': 'v1', 'kind': 'ConfigMap',
                'metadata': {'name': 'cluster-config'},
                'data': {'kind-config.yaml': yaml.dump(kind_config)},
            })

        yield

    @classmethod
    @contextlib.contextmanager
    def k8s_namespace(cls, namespace: str) -> typing.Iterator[None]:
        """Create and clean up a k8s namespace."""
        try:
            cls.core_v1.read_namespace(namespace)
        except client.ApiException:
            LOGGER.info('Creating namespace "%s"', namespace)
            cls.core_v1.create_namespace(client.V1Namespace(
                metadata=client.V1ObjectMeta(name=namespace)))

        yield

        match os.environ.get('CKI_INTEGRATION_TESTS_CLEANUP'):
            case 'skip':
                LOGGER.warning('Skipping cleanup of resources in "%s"', namespace)
            case 'logs':
                resource = cls.dynamic_client.resources.get(kind='Pod')
                for pod in resource.get(namespace=namespace)['items']:
                    print(cls.core_v1.read_namespaced_pod_log(pod.metadata.name, namespace))
            case _:
                LOGGER.info('Deleting resources in "%s"', namespace)
                cls.k8s_delete_all(namespace, DEFAULT_RESOURCES)

    @classmethod
    # pylint: disable=too-many-arguments,too-many-positional-arguments
    def k8s_deployment(
        cls,
        namespace: str,
        name: str,
        setup_at: datetime.datetime,
        container: typing.Any,
        volumes: list[dict[str, typing.Any]] | None = None,
        image_under_test: bool = False,
    ) -> None:
        """Apply a Deployment resource for a service."""
        container = copy.deepcopy(container)
        container['name'] = 'default'
        volumes = copy.deepcopy(volumes or [])
        if image_under_test or cls.image_under_test == name:
            if cls._is_ci():
                container['image'] = container['image'] + f':p-{os.environ["PARENT_PIPELINE_ID"]}'
                LOGGER.debug('Using image %s', container['image'])
            else:
                container['image'] = container['image'] + ':production'
                container.setdefault('volumeMounts', []).append(
                    {'name': 'code', 'mountPath': '/code', 'readOnly': True}
                )
                host_path = '/host' + os.getcwd()
                volumes += [{'name': 'code', 'hostPath': {'path': host_path, 'type': 'Directory'}}]
                LOGGER.debug('Using overlay of %s for /code', host_path)
        cls.k8s_apply(namespace=namespace, body={
            'apiVersion': 'v1', 'kind': 'ServiceAccount',
            'metadata': {'name': name},
        })
        cls.k8s_apply(namespace=namespace, body={
            'apiVersion': 'apps/v1', 'kind': 'Deployment',
            'metadata': {'name': name},
            'spec': {
                'strategy': {'type': 'Recreate'},
                'replicas': 1,
                'selector': {'matchLabels': {'app': name}},
                'template': {
                    'metadata': {
                        'labels': {'app': name},
                        'annotations': {'cki-project.org/setup-at': setup_at.isoformat()},
                    },
                    'spec': {
                        'containers': [container],
                        'serviceAccountName': name,
                        'volumes': volumes,
                    },
                },
            },
        })

    @classmethod
    def k8s_service(cls, namespace: str, name: str) -> None:
        """Apply a Service resource for a service."""
        cls.k8s_apply(namespace=namespace, body={
            'apiVersion': 'v1', 'kind': 'Service',
            'metadata': {'name': name},
            'spec': {
                'selector': {'app': name},
                'type': 'NodePort',
                'ports': list(
                        {'name': f'port-{c}', 'port': c, 'nodePort': n}
                        for d, c, n in cls.known_ports if d == name
                ),
            }
        })

    @classmethod
    def k8s_apply(cls, namespace: str, body: typing.Any) -> None:
        """Server-side apply a resource."""
        cls.dynamic_client.resources.get(kind=body['kind']).server_side_apply(
            namespace=namespace, field_manager='cki-integration-tests', body=body)

    @classmethod
    def k8s_wait(cls, namespace: str, name: str, setup_at: datetime.datetime) -> bool:
        """Wait for a deployment to become available."""
        if not cls._wait(namespace, 'Pod', 'Ready', [
            ('object|metadata|labels|app', name),
            ('object|metadata|annotations|cki-project.org/setup-at', setup_at.isoformat()),
        ]):
            return False
        if not cls._wait(namespace, 'Deployment', 'Available', [
            ('object|metadata|name', name),
        ]):
            return False
        time.sleep(1)  # give the Service magic a bit more time to settle to avoid port timeouts
        LOGGER.debug('Waited %s for %s to become ready', misc.now_tz_utc() - setup_at, name)
        return True

    @staticmethod
    def k8s_startup_probe(port: int) -> dict[str, typing.Any]:
        """Return a startupProbe configuration for a deployment."""
        return {
            'tcpSocket': {'port': port},
            'periodSeconds': 1,
            'failureThreshold': 60,
        }

    @classmethod
    def k8s_delete_all(cls, namespace: str, resources: typing.Iterable[str]) -> None:
        """Remove all resources in a certain namespace."""
        for kind in resources:
            resource = cls.dynamic_client.resources.get(kind=kind)
            for item in resource.get(namespace=namespace)['items']:
                resource.delete(namespace=namespace, name=item.metadata.name)

    @classmethod
    def _wait(cls, namespace: str, kind: str, condition: str,
              checks: list[tuple[str, str]]) -> bool:
        watcher = watch.Watch()
        LOGGER.debug('Waiting for %s', kind)
        for event in cls.dynamic_client.resources.get(kind=kind).watch(
                timeout=60, watcher=watcher, namespace=namespace):
            if not all(misc.get_nested_key(event, k, lookup_attrs=True, delimiter='|') == v
                       for k, v in checks):
                continue
            if any(c['type'] == condition and misc.strtobool(c['status'])
                   for c in (event['object'].status.conditions or [])):
                watcher.stop()
                return True
            # event.type: ADDED, MODIFIED, DELETED
            if event['type'] == 'DELETED':
                LOGGER.debug('deleted before it started')
                watcher.stop()
                return False
        LOGGER.debug('timeout')
        return False
